/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "orc/OrcFile.hh"

#include "Adaptor.hh"
#include "orc/Exceptions.hh"

#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include "hdfspp/hdfspp.h"

namespace orc {

  class HdfsFileInputStream : public InputStream {
  private:
    std::string filename;
    std::unique_ptr<hdfs::FileHandle> file;
    std::unique_ptr<hdfs::FileSystem> file_system;
    uint64_t totalLength;
    const uint64_t READ_SIZE = 1024 * 1024; //1 MB

  public:
    HdfsFileInputStream(std::string _filename) {
      filename = _filename ;

      //Building a URI object from the given uri_path
      hdfs::URI uri;
      try {
        uri = hdfs::URI::parse_from_string(filename);
      } catch (const hdfs::uri_parse_error&) {
        throw ParseError("Malformed URI: " + filename);
      }

      //This sets conf path to default "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
      //and loads configs core-site.xml and hdfs-site.xml from the conf path
      hdfs::ConfigParser parser;
      if(!parser.LoadDefaultResources()){
        throw ParseError("Could not load default resources. ");
      }
      auto stats = parser.ValidateResources();
      //validating core-site.xml
      if(!stats[0].second.ok()){
        throw ParseError(stats[0].first + " is invalid: " + stats[0].second.ToString());
      }
      //validating hdfs-site.xml
      if(!stats[1].second.ok()){
        throw ParseError(stats[1].first + " is invalid: " + stats[1].second.ToString());
      }
      hdfs::Options options;
      if(!parser.get_options(options)){
        throw ParseError("Could not load Options object. ");
      }
      hdfs::IoService * io_service = hdfs::IoService::New();
      //Wrapping file_system into a unique pointer to guarantee deletion
      file_system = std::unique_ptr<hdfs::FileSystem>(
          hdfs::FileSystem::New(io_service, "", options));
      if (file_system.get() == nullptr) {
        throw ParseError("Can't create FileSystem object. ");
      }
      hdfs::Status status;
      //Checking if the user supplied the host
      if(!uri.get_host().empty()){
        //Using port if supplied, otherwise using "" to look up port in configs
        std::string port = uri.has_port() ?
            std::to_string(uri.get_port()) : "";
        status = file_system->Connect(uri.get_host(), port);
        if (!status.ok()) {
          throw ParseError("Can't connect to " + uri.get_host()
              + ":" + port + ". " + status.ToString());
        }
      } else {
        status = file_system->ConnectToDefaultFs();
        if (!status.ok()) {
          if(!options.defaultFS.get_host().empty()){
            throw ParseError("Error connecting to " +
                options.defaultFS.str() + ". " + status.ToString());
          } else {
            throw ParseError(
                "Error connecting to the cluster: defaultFS is empty. "
                + status.ToString());
          }
        }
      }

      if (file_system.get() == nullptr) {
        throw ParseError("Can't connect the file system. ");
      }

      hdfs::FileHandle *file_raw = nullptr;
      status = file_system->Open(uri.get_path(), &file_raw);
      if (!status.ok()) {
        throw ParseError("Can't open "
            + uri.get_path() + ". " + status.ToString());
      }
      //Wrapping file_raw into a unique pointer to guarantee deletion
      file.reset(file_raw);

      hdfs::StatInfo stat_info;
      status = file_system->GetFileInfo(uri.get_path(), stat_info);
      if (!status.ok()) {
        throw ParseError("Can't stat "
            + uri.get_path() + ". " + status.ToString());
      }
      totalLength = stat_info.length;
    }

    uint64_t getLength() const override {
      return totalLength;
    }

    uint64_t getNaturalReadSize() const override {
      return READ_SIZE;
    }

    void read(void* buf,
              uint64_t length,
              uint64_t offset) override {

      if (!buf) {
        throw ParseError("Buffer is null");
      }

      hdfs::Status status;
      size_t total_bytes_read = 0;
      size_t last_bytes_read = 0;

      do {
        status = file->PositionRead(buf,
            static_cast<size_t>(length) - total_bytes_read,
            static_cast<off_t>(offset + total_bytes_read), &last_bytes_read);
        if(!status.ok()) {
          throw ParseError("Error reading the file: " + status.ToString());
        }
        total_bytes_read += last_bytes_read;
      } while (total_bytes_read < length);
    }

    const std::string& getName() const override {
      return filename;
    }

    ~HdfsFileInputStream() override;
  };

  HdfsFileInputStream::~HdfsFileInputStream() {
  }

  std::unique_ptr<InputStream> readHdfsFile(const std::string& path) {
    return std::unique_ptr<InputStream>(new HdfsFileInputStream(path));
  }
}
